-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[core] Add option to support reading sequence_number in AuditLogTable and BinlogTable #6933
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
8bc109b to
1e104c6
Compare
yunfengzhou-hub
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Left some comments as below.
| <td><h5>changelog-read.sequence-number.enabled</h5></td> | ||
| <td style="word-wrap: break-word;">false</td> | ||
| <td>Boolean</td> | ||
| <td>Whether to include _SEQUENCE_NUMBER field in audit_log and binlog system tables. This is only valid for primary key tables.</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The output result of an audit log table can be controlled by incremental-between-scan-mode, where changelog is only one of the options. So we might better not include the word "changelog" in the config option name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to table-read.sequence-number.enabled for clearer semantics, though I'm not sure if this will be supported on regular tables in the future. If it is, the scope of the parameter would become limited.
|
|
||
| public AuditLogTable(FileStoreTable wrapped) { | ||
| this.wrapped = wrapped; | ||
| this.wrapped.schema().options().put(AUDIT_LOG_ENABLED, "true"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about organize the two new configurations in the following way?
- User sets config A.
- In
AuditLogTable, the code detects whether config A is set. If so, set config B to true. - In
ValueContentRowDataRecordIterator, the code only checks config B, instead of configA && configB.
The benefit of such structure is that it can decouple system table concepts from ValueContentRowDataRecordIterator. config B need not be related to audit log table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. The parameters are now aggregated and updated in AuditLogTable, and ValueContentRowDataRecordIterator is only aware of the addSequenceNumberFirst behavior.
The current parameter is AUDIT_ADD_SEQUENCE_NUMBER_FIRST, which clearly reflects what it does (the naming can be discussed further, but it's an internal parameter so it's easy to adjust).
|
|
||
| private final FileStoreTable wrapped; | ||
| /** Number of special fields (rowkind, and optionally _SEQUENCE_NUMBER). */ | ||
| protected final int specialFieldCount; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about introducing List<SpecialField> specialFields? If so, we will be able to change
fields.add(SpecialFields.ROW_KIND);
if (specialFieldCount > 1) {
fields.add(SpecialFields.SEQUENCE_NUMBER);
}into
fields.add(specialFields);There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to List<SpecialField> specialFields
| // _SEQUENCE_NUMBER is at index 0 in bottom output | ||
| return row.getLong(0); | ||
| } | ||
| return super.getLong(pos); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If _SEQUENCE_NUMBER is at index 0, then the other fields should be acquired through pos - 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mapping logic is implemented in AuditLogRead.defaultProjection() (for select *) and AuditLogRead.buildProjection(RowType readType) (for selecting specific fields).
| public long getLong(int pos) { | ||
| int index = indexMapping[pos]; | ||
| if (index == AuditLogRead.SEQUENCE_NUMBER_INDEX) { | ||
| // _SEQUENCE_NUMBER is at index 0 in bottom output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason behind this comment is located in another class(ValueContentRowDataRecordIterator), and these two classes are mainly associated with AUDIT_LOG_ENABLED, which shows a generic concept not related to sequence number.
Thus it might increase the other developer's burden to understand why "_SEQUENCE_NUMBER is at index 0". We might need to think about how to increase code readability here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've optimized the _SEQUENCE_NUMBER index logic in defaultProjection() and buildProjection(RowType readType), so getLong no longer needs special handling here.
| write(table, GenericRow.ofKind(RowKind.INSERT, 1, 2, 5)); | ||
| write(table, GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 2, 5)); | ||
| write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 4, 6)); | ||
| write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 2, 6)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change seems unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original test case is incorrect - the primary keys for RowKind.UPDATE_BEFORE and RowKind.UPDATE_AFTER are inconsistent. This scenario should not occur in normal circumstances.
| expectedRow.add( | ||
| GenericRow.of( | ||
| BinaryString.fromString(RowKind.UPDATE_BEFORE.shortString()), 1, 2, 5)); | ||
| BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()), 1, 2, 6)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the original code, the expected result contains 4 rows. Here only three rows are left, missing an UPDATE_BEFORE row.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original test case is incorrect - the primary keys for RowKind.UPDATE_BEFORE and RowKind.UPDATE_AFTER are inconsistent. This scenario should not occur in normal circumstances.
| // Create primary key table with changelog-read.sequence-number.enabled option | ||
| sql( | ||
| "CREATE TABLE test_table_seq (a int PRIMARY KEY NOT ENFORCED, b int, c AS a + b) " | ||
| + "WITH ('changelog-read.sequence-number.enabled'='true');"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Despite that users are allowed to set changelog-read.sequence-number.enabled when creating or altering table, a more recommended way is to configure it dynamically in the SELECT query through Flink SQL Hints. It should be more suitable for such configurations that are only useful in specific read queries. The usage of SQL hints is like follows
SELECT * FROM `test_table_seq$audit_log`/*+ OPTIONS('changelog-read.sequence-number.enabled' = 'true') */;Let's add test cases for this use case as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's also add some test cases to verify what will happen if changelog-read.sequence-number.enabled is configured on append-only tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new parameter cannot be applied to SQL hints, because the rowType passed by the operator is determined before obtaining dynamic options.
- Before processing SQL hint parameters, the Flink job first constructs the
AuditLogTableto get therowtypefor building upstream/downstreamrowtypetransmission information. - Later, when reading
AuditLogTable, it copies thedynamicOptions. - At this point, the operator's
rowtype(withoutSEQUENCE_NUMbefore obtaining dynamic parameters) does not match the task's processingrowtype(withSEQUENCE_NUM).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In short, for select *, the operator's rowtype is generated before dynamic options are applied.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added dynamicOptions detection to prevent users from accidentally misusing it in SQL hints.
public Table copy(Map<String, String> dynamicOptions) {
if (Boolean.parseBoolean(
dynamicOptions.getOrDefault(
TABLE_READ_SEQUENCE_NUMBER_ENABLED.key(), "false"))) {
throw new UnsupportedOperationException(
"table-read.sequence-number.enabled is not supported by hint.");
}
return new AuditLogTable(wrapped.copy(dynamicOptions));
}d06c910 to
2259c82
Compare
2259c82 to
c2cd7d1
Compare
Purpose
This PR adds support for reading
_SEQUENCE_NUMBERfield inaudit_logandbinlogsystem tables for primary key tables.A new configuration option
changelog-read.sequence-number.enabled(default:false) is introduced to control whether to include the_SEQUENCE_NUMBERfield in the output schema of audit_log and binlog system tables. When enabled, users can access the internal sequence number that Paimon uses for ordering records within primary key tables.Key changes:
CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLEDconfigurationAuditLogTableandBinlogTableto optionally include_SEQUENCE_NUMBERfield in their row typesValueContentRowDataRecordIteratorto output sequence number when enabledTests
AuditLogTableTest- basic auditlog table reading and sequence number with table optionBinlogTableTest- basic binlog table reading and sequence number with table optionBatchFileStoreITCase- Flink integration test for audit_log and binlog reading sequence numberAPI and Format
changelog-read.sequence-number.enabled(Boolean, default:false)audit_logandbinlogsystem tables will have an additional_SEQUENCE_NUMBERcolumn afterrowkindDocumentation
core_configuration.html